/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.schemas;



import com.bff.gaia.unified.sdk.values.Row;

import com.bff.gaia.unified.sdk.values.TypeDescriptor;

import com.bff.gaia.unified.sdk.values.TypeDescriptors;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.BiMap;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableBiMap;

import org.joda.time.Instant;



import java.lang.reflect.ParameterizedType;

import java.util.Collection;

import java.util.Map;



import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.checkArgument;



/**

 * Utilities for converting between {@link Schema} field types and {@link TypeDescriptor}s that

 * define Java objects which can represent these field types.

 */

public class FieldTypeDescriptors {

  private static final BiMap<Schema.TypeName, TypeDescriptor> PRIMITIVE_MAPPING =

      ImmutableBiMap.<Schema.TypeName, TypeDescriptor>builder()

          .put(Schema.TypeName.BYTE, TypeDescriptors.bytes())

          .put(Schema.TypeName.INT16, TypeDescriptors.shorts())

          .put(Schema.TypeName.INT32, TypeDescriptors.integers())

          .put(Schema.TypeName.INT64, TypeDescriptors.longs())

          .put(Schema.TypeName.DECIMAL, TypeDescriptors.bigdecimals())

          .put(Schema.TypeName.FLOAT, TypeDescriptors.floats())

          .put(Schema.TypeName.DOUBLE, TypeDescriptors.doubles())

          .put(Schema.TypeName.STRING, TypeDescriptors.strings())

          .put(Schema.TypeName.DATETIME, TypeDescriptor.of(Instant.class))

          .put(Schema.TypeName.BOOLEAN, TypeDescriptors.booleans())

          .put(Schema.TypeName.BYTES, TypeDescriptor.of(byte[].class))

          .build();

  /** Get a {@link TypeDescriptor} from a {@link Schema.FieldType}. */

  public static TypeDescriptor javaTypeForFieldType(Schema.FieldType fieldType) {

    switch (fieldType.getTypeName()) {

      case LOGICAL_TYPE:

        // TODO: shouldn't we handle this differently?

        return javaTypeForFieldType(fieldType.getLogicalType().getBaseType());

      case ARRAY:

        return TypeDescriptors.lists(javaTypeForFieldType(fieldType.getCollectionElementType()));

      case MAP:

        return TypeDescriptors.maps(

            javaTypeForFieldType(fieldType.getMapKeyType()),

            javaTypeForFieldType(fieldType.getMapValueType()));

      case ROW:

        return TypeDescriptors.rows();

      default:

        return PRIMITIVE_MAPPING.get(fieldType.getTypeName());

    }

  }

  /** Get a {@link Schema.FieldType} from a {@link TypeDescriptor}. */

  public static Schema.FieldType fieldTypeForJavaType(TypeDescriptor typeDescriptor) {

    // TODO: Convert for registered logical types.

    if (typeDescriptor.isArray()

        || typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {

      return getArrayFieldType(typeDescriptor);

    } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Map.class))) {

      return getMapFieldType(typeDescriptor);

    } else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Row.class))) {

      throw new IllegalArgumentException(

          "Cannot automatically determine a field type from a Row class"

              + " as we cannot determine the schema. You should set a field type explicitly.");

    } else {

      Schema.TypeName typeName = PRIMITIVE_MAPPING.inverse().get(typeDescriptor);

      if (typeName == null) {

        throw new RuntimeException("Couldn't find field type for " + typeDescriptor);

      }

      return Schema.FieldType.of(typeName);

    }

  }



  private static Schema.FieldType getArrayFieldType(TypeDescriptor typeDescriptor) {

    if (typeDescriptor.isArray()) {

      if (typeDescriptor.getComponentType().getType().equals(byte.class)) {

        return Schema.FieldType.BYTES;

      } else {

        return Schema.FieldType.array(fieldTypeForJavaType(typeDescriptor.getComponentType()));

      }

    }

    if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(Collection.class))) {

      TypeDescriptor<Collection<?>> collection = typeDescriptor.getSupertype(Collection.class);

      if (collection.getType() instanceof ParameterizedType) {

        ParameterizedType ptype = (ParameterizedType) collection.getType();

        java.lang.reflect.Type[] params = ptype.getActualTypeArguments();

        checkArgument(params.length == 1);

        return Schema.FieldType.array(fieldTypeForJavaType(TypeDescriptor.of(params[0])));

      }

    }

    throw new RuntimeException("Could not determine array parameter type for field.");

  }



  private static Schema.FieldType getMapFieldType(TypeDescriptor typeDescriptor) {

    TypeDescriptor<Collection<?>> map = typeDescriptor.getSupertype(Map.class);

    if (map.getType() instanceof ParameterizedType) {

      ParameterizedType ptype = (ParameterizedType) map.getType();

      java.lang.reflect.Type[] params = ptype.getActualTypeArguments();

      return Schema.FieldType.map(

          fieldTypeForJavaType(TypeDescriptor.of(params[0])),

          fieldTypeForJavaType(TypeDescriptor.of(params[1])));

    }

    throw new RuntimeException("Cound not determine array parameter type for field.");

  }

}